A reliable forecast of the bicycle traffic is an important indicator for planning and optimizing the infrastructure of a city. In this post, I will predict the bicycle traffic in Cologne based on counting data from local bicycle counting stations.
This project was originally part of the course Machine Learning and Scientific Computing at TH Köln and I worked on it in collaboration with my friend and collegue Felix Otte. While Felix supervised the model trainings and the evaluation of the models, I was responsible for data cleaning, exploration, and visualization. For this writeup, I performed re-runs of the original trainings and added a new class-based implementation of the whole pipeline that is closer to something useful in production. As a newly added feature, I also included an implementation of prediction intervals using the mapie library.
This projects showcases the following skills:
Understanding data sets and their issues
Achieving data consistency through cleaning and transformation
Applying appropriate data packages
Getting most of the data with feature engineering
Incorporating other available sources with scraping and geocoding
Efficient optimizazion of hyperparameters with Optuna
Handling multiple models with OOP in Python
Visualization of results.
This post is reproducible. Just follow along and execute the code chunks locally.
Introduction
To make cycling in a bigger city more attractive and safe, the infrastructure has to keep up with the demand of it’s users. The city of Cologne has installed several bicycle counting stations to monitor the traffic. The data is available on the open data portal of the city. As a former bike messenger, I directly sympathized with this data set and found it fun to work with. However, the data quality is not perfect and the data is not consistent across the different stations. So we have to be a bit careful at times.
This project showcases forecasting future bicycle demands by using an XGBoost machine learning model that is trained on the timeseries of former counts as well as weather data. We will train specific models, that are specialized on predicting the counts for a single location as well as fallback/baseline versions, that can be used to predict demands in new locations. With this technique we can hopefully make accurate predictions for both longer existing stations as well as new ones, that are opened at the time of the prediction.
Package-wise we will use polars and pandas for data manipulation, optuna for hyperparameter optimization, and sklearn/XGBoost for the machine learning part. Let’s also define a function, that downloads the bicycle counting data from the open data portal of the city of Cologne. As the tables are not really consistent, we need to take care of the following cleaning steps:
Ensure the German is correctly encoded. We have to rename them, because the encoding is not consistent.
Replace the month names with integers to make conversion to pl.Date easier.
Extract the year from the file name and add it as a column.
Convert year and month to a pl.Date column.
Remove rows that include yearly sums.
Remove columns that have too few data points.
Unpivot the data to have a long format.
Cast the count column to pl.Int64.
Drop rows with missing values. Some files even have blank rows in between.
We can use this in a list comprehension to download and concatenate all .csv files from the portal. The links are stored in a text file. So we will read it’s lines first. The second part includes some feature engineering, that might get moved later on.
fig, ax = plt.subplots(figsize=(8, 10))# create lollipop chartax.hlines(y=yearly_sums["location"], xmin=0, xmax=yearly_sums["yearly_sum_avg"], color='navy')ax.plot(yearly_sums["yearly_sum_avg"], yearly_sums["location"], "o", markersize=8, color='navy')# set plot title and axis labelsax.set_title('Avg. Yearly Sums')ax.set_xlabel('Bicycles Counted in a Year')ax.set_ylabel('Location')# set y-axis limits and invert the y-axisax.set_ylim(ax.get_ylim()[::-1])# hide top and right spinesax.spines['top'].set_visible(False)ax.spines['right'].set_visible(False)# # add annotations for the countsfor i, (count, loc) inenumerate(zip(yearly_sums["yearly_sum_avg"], yearly_sums["location"])): ax.annotate(f"{int(count):}", xy=(count +50000, i), va='center')# x-axis ticksticks = [0, 500_000, 1_000_000, 1_500_000, 2_000_000]ticks_labels = ["0", "500k", "1M", "1.5M", "2M"]ax.set_xticks(ticks)ax.set_xticklabels(ticks_labels, fontsize=12)ax.grid(False)plt.tight_layout()plt.show()
Spotlight “Zülpicher Straße”
Zülpicher Straße is one of the first locations equipped with a counter, and counts the second most bicycles evcery year. We will plot the data with plotly to make it interactive. A trendline helps understand the general development of the counts.
In order to draw the counting stations on a map, we need their locations. Since open data Cologne does not provide this information, we will use the geopy library to scrape the data from OpenStreetMap—specifically, we use the Nominatim geocoder. We will store the data in a dictionary and convert it to a pd.DataFrame for better handling. The call geolocator.geocode(loc + ", Köln, Germany") makes a request to the Nominatim API and returns the location of the adress. So this is only approximate, as the exact counter location may differ from the address.
WARNING:urllib3.connectionpool:Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)")': /search?q=Neumarkt%2C+K%C3%B6ln%2C+Germany&format=json&limit=1
WARNING:urllib3.connectionpool:Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)")': /search?q=Bonner+Stra%C3%9Fe%2C+K%C3%B6ln%2C+Germany&format=json&limit=1
WARNING:urllib3.connectionpool:Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)")': /search?q=Venloer+Stra%C3%9Fe%2C50823+K%C3%B6ln%2C+Germany&format=json&limit=1
WARNING:urllib3.connectionpool:Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)")': /search?q=A.-Sch%C3%BCtte-Allee%2C+K%C3%B6ln%2C+Germany&format=json&limit=1
WARNING:urllib3.connectionpool:Retrying (Retry(total=0, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)")': /search?q=A.-Sch%C3%BCtte-Allee%2C+K%C3%B6ln%2C+Germany&format=json&limit=1
WARNING:urllib3.connectionpool:Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)")': /search?q=A.-Silbermann-Weg%2C+K%C3%B6ln%2C+Germany&format=json&limit=1
WARNING:urllib3.connectionpool:Retrying (Retry(total=0, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)")': /search?q=A.-Silbermann-Weg%2C+K%C3%B6ln%2C+Germany&format=json&limit=1
WARNING:urllib3.connectionpool:Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)")': /search?q=Rodenkirchener+Br%C3%BCcke%2C+K%C3%B6ln%2C+Germany&format=json&limit=1
WARNING:urllib3.connectionpool:Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)")': /search?q=Severinsbr%C3%BCcke%2C+K%C3%B6ln%2C+Germany&format=json&limit=1
WARNING:urllib3.connectionpool:Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)")': /search?q=Neusser+Stra%C3%9Fe%2C50733+K%C3%B6ln%2C+Germany&format=json&limit=1
Plotting
Now let’s put everything together and make an interactive plot. We will use plotly to create a scattermapbox plot and a slider to switch between the dates.
We will scrape the weather data from the web and extract the files necessary. The data is then loaded into a pl.DataFrame and cleaned. We mainly do some renaming and type casting. Lastly, we deselect some columns that are not needed.
weather_detail = ( weather_df .filter(pl.col("date") >= zuelpicher["date"].min()) .filter(pl.col("date") <= zuelpicher["date"].max()) .to_pandas())fig = px.scatter( weather_detail, x="date", y="air_temp_daymean_month_mean", title="Daily Average Air Temperature (Monthly Mean) vs. Date", trendline="ols", trendline_scope="overall", template="simple_white")fig.add_scatter(x=weather_detail["date"], y=weather_detail["air_temp_daymean_month_mean"].rolling(1).mean(), mode='lines', name="Timeseries")# axes titlesfig.update_xaxes(title_text='Date')fig.update_yaxes(title_text='Air Temp. [°C]')fig.show()
Merging the Data
We merge the weather data with the bicycle counts. We will use a left join, because we want to keep all the bicycle counts. In a second step, we will split the data into a training and a test set. The data from year 2022 will be used for testing, only. We can split the data by filtering the date column.
Next up is the fun part: Actual Machine Learning! In this project, we will use the XGBoost regression model. We will use Optuna to sample hyperparameters and cross-validate them with the TimeSeriesSplit.
Feature Engineering
Let’s define some sklearn transformers to perform feature engineering. We will create a ColumnTransformer that will be used in a Pipeline.
Next we will define a custom objective function, that we want to minimize. The function will be called by Optuna and will return the negative RMSE of the cross-validated model. We will use the XGBRegressor from XGBoost as the model and the transformer as shown in the code block above.
In order to fit and predict for multiple counter locations, we will have to write a bit of custom OOP code. Due to the data structure, we want to map special models which are specialized on predicing the bicycle counts for a single location to their data. We will define a class SimpleModel that will hold the models. The class will have methods for tuning, refitting, and predicting. “Simple” refers to the fact that we are predicting single values. We can inherit from this class to extend it with prediction intervals later on.
Code
class SimpleModel():def__init__(self, fixed_params=None, cache_dir='optimization_cache'):if fixed_params isNone: fixed_params = {"n_estimators": 500,"enable_categorical": True,"random_state": 42 }self.cache_dir = cache_dir os.makedirs(cache_dir, exist_ok=True)self.fixed_params = fixed_paramsself.is_fitted =Falsedef set_objective(self, objective):self.objective = objectivedef set_fixed_params(self, fixed_params):self.fixed_params = fixed_paramsdef set_studies(self, studies):self.studies = studiesdef _make_writable_string(self, location):return ( location .lower() .replace(".", "") .replace(" ", "_") .replace("ü", "ue") .replace("ö", "oe") .replace("ä", "ae") .replace("ß", "ss") )def _get_cache_path(self, location):"""Generate a cache file path for a specific location."""return os.path.join(self.cache_dir, f"{location}_study_cache.json")def _save_study_cache(self, location, study_data):"""Save study results to a JSON cache file.""" cache_path =self._get_cache_path(location)withopen(cache_path, 'w') as f: json.dump(study_data, f)def _load_study_cache(self, location):"""Load cached study results for a location.""" cache_path =self._get_cache_path(location)if os.path.exists(cache_path):withopen(cache_path, 'r') as f:return json.load(f)returnNonedef _save_failed_cache(self, failed_studies):"""Save failed studies to a JSON cache file.""" cache_path = os.path.join(self.cache_dir, "failed_studies.json")withopen(cache_path, 'w') as f: json.dump(failed_studies, f)def _load_failed_cache(self):"""Load failed studies from a JSON cache file.""" cache_path = os.path.join(self.cache_dir, "failed_studies.json")if os.path.exists(cache_path):withopen(cache_path, 'r') as f:return json.load(f)return {}def tune(self, train_data, n_trials=200, use_stored=False): metrics = {} succesful_studies = {} failed_studies = {} ifnot use_stored elseself._load_failed_cache()for location, group in train_data.group_by('location'): location = location[0] logger.info(f"Tuning {location}.")if location in failed_studies.keys():continue loc_str =self._make_writable_string(location)# Check if we can use stored resultsif use_stored: cached_study =self._load_study_cache(loc_str)if cached_study: logger.info(f"Using stored hyperparameters for {location}.") inner_dict = cached_study[location] succesful_studies[location] = inner_dict['best_params'] metrics[location] = inner_dict['best_value']continue X_train = group.drop("count").to_pandas() y_train = group["count"] study = optuna.create_study(direction="minimize")try: logger.info(f"Optimizing hyperparameters for {location}.") study.optimize(lambda trial: self.objective(trial, X_train, y_train), n_trials=n_trials ) succesful_studies[location] = study.best_trial.params metrics[location] = study.best_trial.valueself._save_study_cache( loc_str, { location: {'best_params': study.best_trial.params,'best_value': study.best_trial.value } } )exceptValueErroras e: failed_studies[location] =str(e)self.succesful_studies = succesful_studiesself.metrics = metrics# find location with the min mape in the dictself.best_location = pd.Series(metrics).idxmin() best_params = succesful_studies[self.best_location]for location in failed_studies.keys():self.succesful_studies[location] = best_paramsself._save_study_cache(self._make_writable_string(location), { location: {'best_params': best_params,'best_value': metrics[self.best_location] } } )self.failed_studies = failed_studiesself._save_failed_cache(failed_studies)def _refit_inner(self, train_data, location): X_train = train_data.drop("count").to_pandas() y_train = train_data["count"] pipe = make_pipeline( make_feature_transformer(X_train), XGBRegressor(**self.succesful_studies[location],**self.fixed_params ) )return pipe.fit(X_train, y_train)def refit(self, train_data):# first refit the baseline modelself._baseline_pipe =self._refit_inner(train_data, self.best_location)self._pipeline_dict = {}# fit location specific modelsfor location, data inself.succesful_studies.items(): logger.info(f"Refitting model for {location}.")self._pipeline_dict[location] =self._refit_inner( train_data=train_data.filter(pl.col("location") == location), location=location )self.is_fitted =Truedef check_fitted(self):ifnotself.is_fitted:raiseValueError("Model has not been fitted yet.")returnTruedef predict(self, X_test, loc_str=""):self.check_fitted()# return the specific model when available, otherwise the baseline model# and make predictionsreturnself._pipeline_dict.get(loc_str, self._baseline_pipe).predict(X_test)defeval(self, test_data):self.check_fitted() metrics_by_location = {}for location, group in test_data.group_by('location'): location = location[0] X_test = group.drop("count").to_pandas() y_test = group["count"] y_pred =self.predict(X_test, location) metrics_by_location[location] = {"rmse": np.round(root_mean_squared_error(y_test, y_pred)),"mape": np.round(mean_absolute_percentage_error(y_test, y_pred), 2) }return ( pl.DataFrame( pd.DataFrame(metrics_by_location).T .reset_index() .rename(columns={"level_0": "location", "index": "location"}) ) .with_columns( pl.col("location").is_in(self.succesful_studies.keys()).not_().alias("baseline"), pl.col("location").cast(pl.Categorical).alias("location"), pl.col("rmse").cast(pl.Int64).alias("rmse"), ) .join( test_data .group_by("location") .agg(pl.col("count").mean().cast(pl.Int64).alias("count")) .filter(pl.col("location").is_in(metrics_by_location.keys())) , on="location" ) .sort("mape") )def get_pipeline(self, loc_str):returnself._pipeline_dict.get(loc_str, self._baseline_pipe)def save(self, path="models/trained_ml_pipeline.pkl"): joblib.dump(self, path)def plot_predictions(self, data, loc_str):ifnot os.path.exists("images"): os.mkdir("images")self.check_fitted() X_test = data.filter(data["location"] == loc_str).drop("count").to_pandas() y_test = data.filter(data["location"] == loc_str)["count"].to_pandas() y_pred =self.predict(X_test, loc_str) combined = pd.DataFrame({"date": X_test["date"], "Actual": y_test, "Predicted": y_pred}) fig = px.line( combined, x="date", y=["Actual", "Predicted"], title=f"Counter {loc_str}", template="simple_white") fig.update_xaxes(title_text='Date') fig.update_yaxes(title_text='Count')# fig.write_image(f"images/{loc_str}.png") fig.show()
As all necessary boilerplate code is already written in the BicyclePredictor class, we can now use it with very little code.
While our model for Severinsbrücke seems to show the seasonal pattern, it is not able to predict the peaks with the correct magnitude. The model for Bonner Straße is not able to capture the trend at all. Let’s have a look at the best location, on the other hand:
To quantify the uncertainty in our predictions, we can use conformal prediction intervals. As a formal definition for general regression problems let’s assume that our training data \((X,Y)=[(x_1,y_1),\dots,(x_n,y_n)]\) is drawn i.i.d. from an unknown \(P_{X,Y}\).Then the regression task is defined by \(Y=\mu(X)+\epsilon\) with our model \(\mu\) and the residuals \(\epsilon_i ~ P_{Y|X}\). We can define a nominal coverage level for the prediction intervals \(1-\alpha\). So we want to get an interval \(\hat{C}_{n,a}\) for each new prediction on the unseen feature vector \(X_{n+1}\) such that $\(P[Y_{n+1}\in \hat{C}_{n,a}(X_{n+1})]\ge 1-\alpha\) . We will use the MAPIE package to get performance intervals that are model agnostic. This is great, because we can plug in our existing pipeline and get the intervals with just a bit of wrapping. Now, there are different approaches to constructing prediction intervals. But since we are dealing with a timeseries, that violates the i.i.d assumption, we need to use ensemble batch prediction intervals (EnbPI). Read more about that here.
We will define a new class QuantileModels that inherits from BicyclePredictor and extends it with the necessary methods to fit the MapieTimeSeriesRegressorand predict the intervals. We will use the BlockBootstrap method to resample the data and get the intervals. We will also define a method to plot the intervals.
This code combines some of the examples here and here.
Now, we can instantiate the MAPIE model and set some necessary attributes. We are basically transferring some optimization results to the new instance to not tune the models again. Instead, we are just fitting the underlying MAPIE time series models.
The model is fairly uncertain, what the lower bound of bicycles counted is in the unseen time range. The upper bound at least includes all the actual counts. This is a good sign, but the model might be too conservative.
Extension for Real-Time Predictions
An extension to this project allowing real-time predictions is described in this follow-up post. Here, I’ll use the Flask library to create a REST API that makes predictions using our pickled models. We will also use real-time weather data from the tomorrow.io API to get a inference feature vector.